﻿/**
* CRL
*/
using CRL.EventBus.Queue;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Text;

namespace CRL.EventBus
{
    public class QueueFactory
    {
        static ConcurrentDictionary<string, AbsQueue> clients = new ConcurrentDictionary<string, AbsQueue>();
        public static AbsQueue GetSubQueueClient(QueueConfig config, EventDeclare eventDeclare)
        {
            var _queueName = "default";
            var attr = eventDeclare.SubscribeAttribute;
            if(!string.IsNullOrEmpty(attr.QueueName))
            {
                _queueName = attr.QueueName;
            }
            var setting = config.GetMqSetting(attr.MQType);
            var key = $"CRL_{setting.MQType}_{_queueName}";
            if (setting.MQType == MQType.RabbitMQ)
            {
                key = $"CRL_{setting.MQType}_{_queueName}_{eventDeclare.IsAsync}";//ֻ����һ������
            }
            var a = clients.TryGetValue(key, out AbsQueue client);
            if (!a)
            {
                client = CreateClient(setting, eventDeclare.IsAsync);
                client.Name = key;
                clients.TryAdd(key, client);
            }
            return client;
        }
        public static AbsQueue CreateClient(ConfigBase config, bool async)
        {
            return config.InstanceType.Invoke(config, async);
        }
        public static void DisposeAll()
        {
            foreach (var kv in clients)
            {
                var d = kv.Value as IDisposable;
                d.Dispose();
            }
        }
        internal static void ConfirmConsume()
        {
            foreach (var kv in clients)
            {
                kv.Value.ConfirmConsume();
            }
        }
    }
}
